package core

import (
	"context"
	"gitee.com/zackeus/go-zero/core/logx"
	"github.com/panjf2000/ants/v2"
	"sort"
	"sync"
	"sync/atomic"
	"time"
)

type (
	// ScheduleParser is an interface for schedule spec parsers that return a Schedule
	ScheduleParser interface {
		Parse(spec string) (Schedule, error)
	}

	// Schedule describes a job's duty cycle.
	Schedule interface {
		// Next returns the next activation time, later than the given time.
		// Next is invoked initially, and then each time the job is run.
		Next(time.Time) time.Time
	}

	// Job is an interface for submitted cron jobs.
	Job interface {
		Run()
	}

	// FuncJob is a wrapper that turns a func() into a cron.Job
	FuncJob func()

	// byTime is a wrapper for sorting the entry array by time
	// (with zero time at the end).
	byTime []*Entry

	// EntryID identifies an entry within a Cron instance
	EntryID int

	// Entry consists of a schedule and the func to execute on that schedule.
	Entry struct {
		// ID is the cron-assigned ID of this entry, which may be used to look up a
		// snapshot or remove it.
		ID EntryID

		// Schedule on which this job should be run.
		Schedule Schedule

		// Next time the job will run, or the zero time if Cron has not been
		// started or this entry's schedule is unsatisfiable
		Next time.Time

		// Prev is the last time this job was run, or the zero time if never.
		Prev time.Time

		// WrappedJob is the thing to run when the Schedule is activated.
		WrappedJob Job

		// Job is the thing that was submitted to cron.
		// It is kept around so that user code that needs to get at the job later,
		// e.g. via Entries() can do so.
		Job Job
	}

	// Cron keeps track of any number of entries, invoking the associated func as
	// specified by the schedule. It may be started, stopped, and the entries may
	// be inspected while running.
	Cron struct {
		mu             sync.RWMutex
		ctx            context.Context
		entries        []*Entry
		chain          Chain
		stop           chan struct{}
		add            chan *Entry
		remove         chan EntryID
		snapshot       chan chan []Entry
		location       *time.Location
		parser         ScheduleParser
		nextID         EntryID
		concurrentSize uint64
		jobPool        *ants.Pool
		running        atomic.Bool
		stopFunc       func()
	}
)

func (s byTime) Len() int      { return len(s) }
func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTime) Less(i, j int) bool {
	// Two zero times should return false.
	// Otherwise, zero is "greater" than any other time.
	// (To sort it at the end of the list.)
	if s[i].Next.IsZero() {
		return false
	}
	if s[j].Next.IsZero() {
		return true
	}
	return s[i].Next.Before(s[j].Next)
}

func (f FuncJob) Run() { f() }

// Valid returns true if this is not the zero entry.
func (e Entry) Valid() bool { return e.ID != 0 }

// New returns a new Cron job runner, modified by the given options.
//
// Available Settings
//
//	Time Zone
//	  Description: The time zone in which schedules are interpreted
//	  Default:     time.Local
//
//	Parser
//	  Description: Parser converts cron spec strings into cron.Schedules.
//	  Default:     Accepts this spec: https://en.wikipedia.org/wiki/Cron
//
//	Chain
//	  Description: Wrap submitted jobs to customize behavior.
//	  Default:     A chain that recovers panics and logs them to stderr.
//
// See "cron.With*" to modify the default behavior.
func New(ctx context.Context, opts ...Option) (*Cron, error) {
	/* 构建上下文 */
	nCtx, stop := context.WithCancel(ctx)

	c := &Cron{
		ctx:            nCtx,
		entries:        nil,
		chain:          NewChain(),
		add:            make(chan *Entry),
		snapshot:       make(chan chan []Entry),
		remove:         make(chan EntryID),
		location:       time.Local,
		parser:         standardParser,
		concurrentSize: 100,
		stopFunc:       stop,
	}
	for _, opt := range opts {
		opt(c)
	}

	/* 构建阻塞的 goroutine pool */
	pool, err := ants.NewPool(int(c.concurrentSize),
		ants.WithNonblocking(false),
		ants.WithPreAlloc(false),
		/* 清理周期 */
		ants.WithExpiryDuration(1*time.Hour),
		/* 定期清理携程 */
		ants.WithDisablePurge(false),
	)
	if err != nil {
		return nil, err
	}
	c.jobPool = pool
	c.running.Store(false)

	return c, nil
}

func (c *Cron) Active() bool {
	return c.running.Load() && c.ctx.Err() == nil
}

// AddFunc adds a func to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
	return c.AddJob(spec, FuncJob(cmd))
}

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
	schedule, err := c.parser.Parse(spec)
	if err != nil {
		return 0, err
	}
	return c.Schedule(schedule, cmd), nil
}

// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.nextID++
	entry := &Entry{
		ID:         c.nextID,
		Schedule:   schedule,
		WrappedJob: c.chain.Then(cmd),
		Job:        cmd,
	}
	if c.Active() {
		c.add <- entry
	} else {
		c.entries = append(c.entries, entry)
	}
	return entry.ID
}

// Entries returns a snapshot of the cron entries.
func (c *Cron) Entries() []Entry {
	c.mu.RLock()
	defer c.mu.RUnlock()
	if c.Active() {
		replyChan := make(chan []Entry, 1)
		c.snapshot <- replyChan
		return <-replyChan
	}
	return c.entrySnapshot()
}

// Location gets the time zone location
func (c *Cron) Location() *time.Location {
	return c.location
}

// Entry returns a snapshot of the given entry, or nil if it couldn't be found.
func (c *Cron) Entry(id EntryID) Entry {
	c.mu.RLock()
	defer c.mu.RUnlock()
	for _, entry := range c.Entries() {
		if id == entry.ID {
			return entry
		}
	}
	return Entry{}
}

// Remove an entry from being run in the future.
func (c *Cron) Remove(id EntryID) {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.Active() {
		c.remove <- id
	} else {
		c.removeEntry(id)
	}
}

// Start 启动 cron scheduler
func (c *Cron) Start() {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.Active() {
		return
	}

	go c.run()
}

// Stop 关闭 cron scheduler
func (c *Cron) Stop(t ...time.Duration) {
	c.mu.Lock()
	defer c.mu.Unlock()

	if c.Active() {
		c.running.Store(false)
		c.stopFunc()

		if t != nil && t[0].Milliseconds() > 0 {
			/* 等待任务调度完毕超时时长 */
			_ = c.jobPool.ReleaseTimeout(t[0])
		} else {
			c.jobPool.Release()
		}
	}
}

// now returns current time in c location
func (c *Cron) now() time.Time {
	return time.Now().In(c.location)
}

// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
	// Figure out the next activation times for each entry.
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
		logx.Debug("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
	}

	for {
		/* 任务运行排序 */
		sort.Sort(byTime(c.entries))

		var timer *time.Timer
		c.mu.RLock()
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
			/* 没有需运行的任务 */
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}
		c.mu.RUnlock()

		select {
		case now = <-timer.C:
			now = now.In(c.location)

			// Run every entry whose next time was less than now
			for _, e := range c.entries {
				if e.Next.After(now) || e.Next.IsZero() {
					break
				}
				c.startJob(e.WrappedJob)
				e.Prev = e.Next
				e.Next = e.Schedule.Next(now)
				logx.Debugf("[run] now: %v, entry: %d, next: %v", now, e.ID, e.Next)
			}

		case newEntry := <-c.add:
			/* 任务添加 */
			timer.Stop()
			now = c.now()
			newEntry.Next = newEntry.Schedule.Next(now)
			c.entries = append(c.entries, newEntry)
			logx.Debugf("[added] now: %v, entry: %d, next: %v", now, newEntry.ID, newEntry.Next)

		case replyChan := <-c.snapshot:
			replyChan <- c.entrySnapshot()
			continue

		case id := <-c.remove:
			/* 任务移除 */
			timer.Stop()
			now = c.now()
			c.removeEntry(id)
			logx.Debugf("[removed] entry: %d", id)

		case <-c.ctx.Done():
			timer.Stop()
			return
		}
	}
}

// startJob runs the given job in a new goroutine.
func (c *Cron) startJob(j Job) {
	_ = c.jobPool.Submit(func() {
		j.Run()
	})
}

// entrySnapshot returns a copy of the current cron entry list.
func (c *Cron) entrySnapshot() []Entry {
	var entries = make([]Entry, len(c.entries))
	for i, e := range c.entries {
		entries[i] = *e
	}
	return entries
}

func (c *Cron) removeEntry(id EntryID) {
	var entries []*Entry
	for _, e := range c.entries {
		if e.ID != id {
			entries = append(entries, e)
		}
	}
	c.entries = entries
}
